TaskGroup 是一種在 Airflow 中用來組織和管理多個 Task 的方法,透過將任務分類成不同群組,讓我們 DAG 當中的任務(Tasks)不會雜亂無章,能夠更容易地設計、維護和監控整個工作流程,可以想像成是網頁書籤資料夾,方便我們整理多個單獨的網頁。
優點:可重複使用的輪子、清晰組織架構、維護性好
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id = 'task_group_dag',
start_date = datetime(2023, 9, 26),
schedule_interval=None
) as dag:
可以使用 TaskGroup
的 Class 創建群組
with DAG(
...
) as dag:
with TaskGroup(group_id='my_task_group') as my_task_group:
with TaskGroup...:
task_1 = DummyOperator(task_id='task_1', dag=my_task_group)
task_2 = DummyOperator(task_id='task_2', dag=my_task_group)
task_3 = DummyOperator(task_id='task_3', dag=my_task_group)
with TaskGroup
會在 with DAG(
的裡面,巢狀縮排的概念。with TaskGroup...:
task_1 = ...
...
task_1 >> task_2 >> task_3
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
start_task >> my_task_group >> end_task
task1
、task2
、task3
都包進 my_task_group
當中了start_task
,接著完成 my_task_group
當中任務,最後在執行 end_task
task1
、task2
、task3
我執行了兩次,第二次故意在 TaskGroup 中的 task2
標記 fail,可以看到整個 TaskGroup 的狀態(state)就會是 fail,所以當 Task 被綁再一起成為生命共同體 TaskGroup,他們就是要共同進退的,一個人失敗就整組失敗,沒有人可以當小組報告的冗員
from airflow import DAG
from datetime import datetime
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
with DAG(
dag_id = 'task_group_dag',
start_date = datetime(2023, 9, 26),
schedule_interval=None
) as dag:
with TaskGroup(group_id='my_task_group') as tg1:
task_1 = DummyOperator(task_id='task_1')
task_2 = DummyOperator(task_id='task_2')
task_3 = DummyOperator(task_id='task_3')
task_1 >> task_2 >> task_3
start_task = DummyOperator(task_id='start_task')
end_task = DummyOperator(task_id='end_task')
start_task >> tg1 >> end_task